package com.lzm.toolfx.module.aio;

import com.alibaba.fastjson.JSONObject;
import com.lzm.toolfx.common.Property;
import com.lzm.toolfx.module.notiy.FakeSocket;
import com.lzm.toolfx.module.notiy.NotifyServer;
import com.lzm.toolfx.module.notiy.NotifyType;
import com.lzm.toolfx.utils.ByteUtils;
import com.lzm.toolfx.utils.DateUtil;
import javafx.application.Platform;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;

/*******************************************************************
  *                       MsgHandler异步消息处理                      *
  *  AsynchronousSocketChannel : Socket通道                          *
  *  AioServer :  服务端                                             *
  *  NotifyServer : 通知 MainController Socket连接和断开服务          *
  *                                                                 *
  *  function:                                                      *
  *     ->read  : 读取Socket接收的 ByteBuffer                        *
  *     ->write : 写入数据到Socket                                   *
 *******************************************************************/

public class MsgHandler implements CompletionHandler<AsynchronousSocketChannel, AioServer> {

    //private Queue sendQueue = MsServer.getInstance(Property.msmq_ip,Property.msmq_queueName);//消息队列

    @Override
    public void completed(AsynchronousSocketChannel asc, AioServer attachment) {
        try {
            NotifyServer server = new NotifyServer();
            server.registerObserver(attachment.getSocketObserver());
            String remoteSocket = asc.getRemoteAddress().toString();
            Platform.runLater(()->{
                System.out.println(DateUtil.now() +"连接: " + remoteSocket+"\n"+ Property.dash);
            });
            server.setNotify(NotifyType.IS_CONNECT,new FakeSocket(remoteSocket,true) );
            attachment.getSocketChannel().accept(attachment, this);            //持续调用Server的accept方法，保证多个客户端都可以阻塞
            /*
            scheduledExecutorService.scheduleAtFixedRate(() ->
                    server.setNotify(NotifyType.SIZE,new FakeSocket(remoteSocket,0,System.currentTimeMillis())), 0, 60, TimeUnit.SECONDS);
            */
            read(asc,attachment);
        }catch (IOException  e){
            //TODO handle IOException
        }

    }

    /**读取数据
     * @param asc 异步Socket通道
     *
     * */
    private void read(final AsynchronousSocketChannel asc,AioServer aioServer)  throws IOException{
        String remoteSocket = asc.getRemoteAddress().toString();
        //缓冲区
        ByteBuffer buf = ByteBuffer.allocate(409600);
        //ByteBuffer buff = ByteBuffer.allocateDirect(4096);
        asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer resultSize, ByteBuffer attachment) {
                //上下线通知
                NotifyServer server = new NotifyServer();
                //注册消费者
                server.registerObserver(aioServer.getSocketObserver());
                //读取之后,重置标识位
                attachment.flip();
                //size等于-1 == 连接关闭
                if(resultSize<0) {
                    Platform.runLater(()->{
                        System.out.println(DateUtil.now()+"关闭: "+remoteSocket+"\n"+ Property.dash);
                        server.setNotify(NotifyType.IS_CLOSE,new FakeSocket(remoteSocket,false));
                    });
                    return;
                }
                //TODO　处理消息
                if(Property.MQAvailable){
                   decode(attachment,aioServer);
                }
                server.setNotify(NotifyType.SIZE,new FakeSocket(remoteSocket,resultSize,System.currentTimeMillis()));
                try {
                    read(asc,aioServer);
                }catch (IOException e){
                    //nothing
                }
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }

    /** 写入数据 用于接收后立即执行返回消息
     * @param asc 异步Socket通道
     * @param response 输出的字符串
     * */
    private void write(AsynchronousSocketChannel asc, String response) {
        try {
            ByteBuffer buf = ByteBuffer.allocate(1024);
            buf.put(response.getBytes());
            buf.flip();
            asc.write(buf).get();
        } catch (InterruptedException |ExecutionException e) {
            e.printStackTrace();
        }
    }


    /**
     * 失败回调
     * */
    @Override
    public void failed(Throwable exc, AioServer attachment) {
        //TODO 接收失败处理
        exc.printStackTrace();
    }



    /** 解析数据
     * @param buffer 收到的字符流缓存
     * @param server 服务 用于获取IP
     * */
    private void decode(ByteBuffer buffer, AioServer server) {
        byte[] deviceId = new byte[2];
        try {
            byte[] head = new byte[2];
            byte[] length = new byte[2];
            buffer.get(head);
            buffer.get(length);
            byte[] cmd_sn = new byte[2];
            buffer.get(deviceId);
            buffer.get(cmd_sn);
            int count = buffer.get();
            for(int i = 0;i<count;i++){
                JSONObject msg = new JSONObject();
                //msg.put("time",System.currentTimeMillis());
                byte[] tagId = new byte[4];
                byte[] param = new byte[1];
                int signal;
                byte[] end = new byte[2];
                buffer.get(tagId);
                buffer.get(param);
                signal = buffer.get();
                buffer.get(end);
                msg.put("signal", signal);
                msg.put("tagId", ByteUtils.Hex2int(tagId));
                msg.put("deviceId",ByteUtils.Hex2int(deviceId));
                msg.put("distance",Math.pow(10, (-41-signal)/(10*2)));
                msg.put("time",System.currentTimeMillis());
                System.err.println(msg.toJSONString());
                /*设备号，标签号，信号强度，距离，时间*/
               /* try {
                    String label = ""+ByteUtils.Hex2int(deviceId);
                    String correlationId = "";
                    // 消息主体，正文放在body里
                    Message message = new Message(msg.toJSONString(),label,correlationId);
                    sendQueue.send(message);
                } catch (UnsupportedEncodingException | MessageQueueException e) {
                  //
                }*/
            }
        }catch (Exception e){
            //
        }
    }
}

